home *** CD-ROM | disk | FTP | other *** search
- /*
- queue_library.c --- queue library interface.
-
- Copyright (c) 1995 SHW Wabnitz
- Written by Bernhard Fastenrath (fasten@shw.com)
-
- This file may be distributed under the terms
- of the GNU General Public License.
- */
-
- #if defined (__GNUC__)
- #include <stabs.h>
- #endif
-
- #include "queue_library.h"
-
- struct ExecBase *SysBase = NULL;
- struct Library *QueueBase = NULL;
- Semaphore QueuesSemaphore;
- List Queues;
-
- #if defined (__GNUC__)
- const BYTE LibName[] = "queue.library";
- const BYTE LibIdString[] = "$VER: queue.library 3.1 (12-3-96)";
- const UWORD LibVersion = 3;
- const UWORD LibRevision = 1;
- #endif
-
- #if defined (__GNUC__)
- #define LIBRT
- #define REG(regname)
- #define STRUCT_MYLIB struct Library
- #elif defined (__SASC)
- #define LIBRT __saveds __asm
- #define REG(regname) register __ ## regname
- #define ADDTABL_1(name,arg1);
- #define ADDTABL_2(name,arg1,arg2);
- #define ADDTABL_3(name,arg1,arg2,arg3);
- #define ADDTABL_END();
- #define STRUCT_MYLIB struct MyLibrary
- #endif
-
- /*** configuration ***/
-
- #ifdef SERIALIZE_WITH_FORBID
- #define ObtainSemaphore(s) Forbid()
- #define ReleaseSemaphore(s) Permit()
- #endif
-
- /*** internal functions ***/
-
- #if defined (__GNUC__)
- static ULONG
- strlen (char *name)
- {
- ULONG t = 0;
-
- while (name[t])
- t++;
- return t;
- }
- #endif
-
- static void
- SetMarker (QueueHandle *qh, QMessage *msg)
- {
- QueueNode *qn = qh -> qh_QNode;
-
- if (qh -> qh_un.qhl.qhl_MinNode.mln_Succ)
- Remove ((Node *) &qh -> qh_un.qhl.qhl_MinNode);
- Insert (&qn -> qn_List, (Node *) &qh -> qh_un.qhl.qhl_MinNode, (Node *) msg);
- }
-
- static void
- ClearMarker (QueueHandle *qh)
- {
- Remove ((Node *) &qh -> qh_un.qhl.qhl_MinNode);
- qh -> qh_un.qhl.qhl_MinNode.mln_Succ = NULL;
- }
-
- static void
- RemoveAndReply (QMessage *msg)
- {
- QueueHandle *qh;
-
- if (!msg -> qm_Refs)
- {
- qh = (QueueHandle *) msg -> qm_Owner;
- Remove ((Node *) msg);
- /*** reply to owner ***/
- /* ObtainSemaphore (); */
- AddHead (&qh -> qh_un.qhs.qhs_ReplyList, (Node *) msg);
- Signal (qh -> qh_SigTask, qh -> qh_SigMask);
- /* ReleaseSemaphore (); */
- }
- else
- msg -> qm_Status = QMS_REMOVED;
- }
-
- static void
- ReplyQMessage (QueueNode *qn, QMessage *msg)
- {
- msg -> qm_Refs --;
- msg -> qm_Replies ++;
-
- if (msg -> qm_Replies >= qn -> qn_Read || msg -> qm_Status == QMS_REMOVED)
- RemoveAndReply (msg);
- }
-
- /*** library interface ***/
-
- int LIBRT
- __UserLibInit (REG(a6) STRUCT_MYLIB *libbase)
- {
- SysBase = *(struct ExecBase **)4;
- QueueBase = (struct Library *) libbase;
-
- #ifndef SERIALIZE_WITH_FORBID
- InitSemaphore (&QueuesSemaphore);
- #endif
- NewList (&Queues);
- return 0; /* success */
- }
-
- void LIBRT
- __UserLibCleanup (REG(a6) STRUCT_MYLIB *libbase)
- {
- }
-
- ADDTABL_3(LIBQOpen,a0,d0,d1);
-
- QHandle LIBRT
- LIBQOpen (REG(a0) STRPTR name, REG(d0) ULONG mode, REG(d1) ULONG sigbit)
- {
- QueueHandle *qh;
- QueueNode *qn;
- ULONG len;
-
- if (!(qh = (QueueHandle *) AllocMem (sizeof (QueueHandle), MEMF_PUBLIC | MEMF_CLEAR)))
- return NULL;
-
- ObtainSemaphore (&QueuesSemaphore);
-
- if (!(qn = (QueueNode *) FindName (&Queues, name)))
- {
- if (!(qn = AllocMem (sizeof (QueueNode), MEMF_PUBLIC | MEMF_CLEAR)))
- {
- ReleaseSemaphore (&QueuesSemaphore);
- FreeMem (qh, sizeof (QueueHandle));
- return NULL;
- }
- #ifndef SERIALIZE_WITH_FORBID
- InitSemaphore (&qn -> qn_Semaphore);
- #endif
- ObtainSemaphore (&qn -> qn_Semaphore);
-
- len = strlen (name) + 1;
- if (!(qn -> qn_Node.ln_Name = AllocMem (len, MEMF_PUBLIC)))
- {
- ReleaseSemaphore (&qn -> qn_Semaphore);
- ReleaseSemaphore (&QueuesSemaphore);
- FreeMem (qh, sizeof (QueueHandle));
- FreeMem (qn, sizeof (QueueNode));
- return NULL;
- }
- CopyMem (name, qn -> qn_Node.ln_Name, len);
-
- AddHead (&Queues, (Node *) qn);
- NewList (&qn -> qn_Handles);
- NewList (&qn -> qn_List);
- qn -> qn_Refs = 1;
- }
- else
- {
- ObtainSemaphore (&qn -> qn_Semaphore);
- qn -> qn_Refs ++;
- }
- AddHead (&qn -> qn_Handles, (Node *) qh);
- qh -> qh_Mode = mode;
- qh -> qh_QNode = qn;
- qh -> qh_SigMask = 1 << sigbit;
- qh -> qh_SigTask = FindTask (0);
- if (mode == QMODE_LISTEN)
- {
- qn -> qn_Read ++;
- qh -> qh_un.qhl.qhl_Status = QMS_MARKER;
- Signal (qh -> qh_SigTask, qh -> qh_SigMask);
- }
- else
- {
- NewList (&qh -> qh_un.qhs.qhs_ReplyList);
- }
- ReleaseSemaphore (&qn -> qn_Semaphore);
- ReleaseSemaphore (&QueuesSemaphore);
- return (QHandle) qh;
- }
-
- ADDTABL_1(LIBQClose,a0);
-
- ULONG LIBRT
- LIBQClose (REG(a0) QHandle qhandle)
- {
- QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
- QueueHandle *qh = qhandle;
- QMessage *cmsg, *msg, *next;
- ULONG count;
-
- /* to avoid a possible deadlock with QOpen() the QueuesSemaphore
- has to be locked before the QNode's semaphore here.
- */
- ObtainSemaphore (&QueuesSemaphore);
- ObtainSemaphore (&qn -> qn_Semaphore);
-
- if (qh -> qh_Mode == QMODE_SEND)
- {
- if (count = qh -> qh_un.qhs.qhs_MsgCount)
- {
- ReleaseSemaphore (&QueuesSemaphore);
- ReleaseSemaphore (&qn -> qn_Semaphore);
- return count;
- }
- }
- else
- {
- if (cmsg = (QMessage *) qh -> qh_un.qhl.qhl_MinNode.mln_Succ)
- {
- ClearMarker (qh);
- next = (QMessage *) cmsg -> qm_MinNode.mln_Pred;
- }
- else if (cmsg = qh -> qh_un.qhl.qhl_Message)
- {
- cmsg -> qm_Refs --;
- cmsg -> qm_Replies ++;
- next = cmsg;
- }
- if (next)
- {
- for (msg = next; next = (QMessage *) msg -> qm_MinNode.mln_Pred;
- msg = next)
- {
- /* Replies from the current task
- don't count after QClose().
- */
- if (msg -> qm_Status == QMS_ACTIVE || msg -> qm_Status == QMS_REMOVED)
- msg -> qm_Replies --;
- }
- }
- else
- cmsg = (QMessage *) qn -> qn_List.lh_Head;
-
- for (msg = cmsg; next = (QMessage *) msg -> qm_MinNode.mln_Succ; msg = next)
- {
- /* These messages might be in the queue, waiting only
- for the current task; let's give them a chance.
- */
- if ((msg -> qm_Status == QMS_ACTIVE && msg -> qm_Replies >= qn -> qn_Read)
- || msg -> qm_Status == QMS_REMOVED)
- {
- RemoveAndReply (msg);
- }
- if (msg -> qm_Status == QMS_MARKER)
- break;
- }
- qn -> qn_Read --;
- }
- Remove ((Node *) qh);
- FreeMem (qh, sizeof (QueueHandle));
-
- if (! -- qn -> qn_Refs)
- {
- ReleaseSemaphore (&qn -> qn_Semaphore);
- Remove ((Node *) qn);
- FreeMem (qn -> qn_Node.ln_Name, strlen (qn -> qn_Node.ln_Name) + 1);
- FreeMem (qn, sizeof (QueueNode));
- }
- else
- ReleaseSemaphore (&qn -> qn_Semaphore);
-
- ReleaseSemaphore (&QueuesSemaphore);
- return 0;
- }
-
- ADDTABL_2(LIBQAddMsg,a0,a1);
-
- void LIBRT
- LIBQAddMsg (REG(a0) QHandle qhandle, REG(a1) QMessage *msg)
- {
- QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
- QueueHandle *qh = (QueueHandle *) qhandle;
-
- msg -> qm_Owner = qhandle;
- msg -> qm_Refs = 0;
- msg -> qm_Replies = 0;
- msg -> qm_Status = QMS_ACTIVE;
- ObtainSemaphore (&qn -> qn_Semaphore);
- AddTail (&qn -> qn_List, (Node *) msg);
- qh -> qh_un.qhs.qhs_MsgCount ++;
- if (qn -> qn_Read)
- {
- for (qh = (QueueHandle *) qn -> qn_Handles.lh_Head; qh -> qh_MinNode.mln_Succ;
- qh = (QueueHandle *) qh -> qh_MinNode.mln_Succ)
- {
- if (qh -> qh_Mode == QMODE_LISTEN)
- Signal (qh -> qh_SigTask, qh -> qh_SigMask);
- }
- }
- else
- RemoveAndReply (msg);
- ReleaseSemaphore (&qn -> qn_Semaphore);
- }
-
- ADDTABL_2(LIBQRemMsg,a0,a1);
-
- void LIBRT
- LIBQRemMsg (REG(a0) QHandle qhandle, REG(a1) QMessage *msg)
- {
- QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
-
- ObtainSemaphore (&qn -> qn_Semaphore);
- RemoveAndReply (msg);
- ReleaseSemaphore (&qn -> qn_Semaphore);
- }
-
- ADDTABL_1(LIBQGetMsg,a0);
-
- QMessage * LIBRT
- LIBQGetMsg (REG(a0) QHandle qhandle)
- {
- QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
- QueueHandle *qh = (QueueHandle *) qhandle;
- QMessage *msg, *next;
-
- ObtainSemaphore (&qn -> qn_Semaphore);
-
- /* A server ( QMODE_SEND ) retrieves a replied message */
-
- if (qh -> qh_Mode == QMODE_SEND)
- {
- if (msg = (QMessage *) RemTail (&qh -> qh_un.qhs.qhs_ReplyList))
- qh -> qh_un.qhs.qhs_MsgCount --;
- ReleaseSemaphore (&qn -> qn_Semaphore);
- return msg;
- }
-
- /* A client ( QMODE_LISTEN ) reads a message */
-
- if (msg = qh -> qh_un.qhl.qhl_Message)
- {
- /* automagically reply the current message */
-
- next = (QMessage *) msg -> qm_MinNode.mln_Succ;
- ReplyQMessage (qn, msg);
- msg = next;
- }
- else /* no current message, start at marker or list head */
- {
- if (msg = (QMessage *) qh -> qh_un.qhl.qhl_MinNode.mln_Succ)
- {
- ClearMarker (qh);
- }
- else
- msg = (QMessage *) qn -> qn_List.lh_Head;
- }
- for (;;) /* find the next message */
- {
- if (!(next = (QMessage *) msg -> qm_MinNode.mln_Succ))
- {
- SetMarker (qh, (QMessage *) msg -> qm_MinNode.mln_Pred);
- msg = NULL; /* no message available */
- break;
- }
- if (msg -> qm_Status & QMS_INACTIVE)
- {
- if (msg -> qm_Status == QMS_REMOVED)
- RemoveAndReply (msg);
- msg = next;
- continue;
- }
- msg -> qm_Refs ++;
- break;
- }
- qh -> qh_un.qhl.qhl_Message = msg;
- ReleaseSemaphore (&qn -> qn_Semaphore);
- return msg;
- }
-
- ADDTABL_1(LIBQReplyMsg,a0);
-
- ULONG LIBRT
- LIBQReplyMsg (REG(a0) QHandle qhandle)
- {
- QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
- QueueHandle *qh = (QueueHandle *) qhandle;
- QMessage *msg, *next;
-
- if (!(msg = qh -> qh_un.qhl.qhl_Message))
- return 0;
-
- ObtainSemaphore (&qn -> qn_Semaphore);
-
- SetMarker (qh, msg);
-
- next = (QMessage *) msg -> qm_MinNode.mln_Succ;
- ReplyQMessage (qn, msg);
-
- if (next)
- {
- for (msg = next; next = (QMessage *) msg -> qm_MinNode.mln_Succ; msg = next)
- {
- if (msg -> qm_Status == QMS_ACTIVE)
- {
- /* Since QReplyMsg() means you're not going to read more messages now
- it's probably a good idea to remind you that there's more.
- */
- Signal (qh -> qh_SigTask, qh -> qh_SigMask);
- break;
- }
- }
- }
- qh -> qh_un.qhl.qhl_Message = NULL;
- ReleaseSemaphore (&qn -> qn_Semaphore);
- return 1;
- }
-
- ADDTABL_1(LIBQFlush,a0);
-
- ULONG LIBRT
- LIBQFlush (REG(a0) QHandle qhandle)
- {
- QueueNode *qn = ((QueueHandle *) qhandle) -> qh_QNode;
- QMessage *msg, *next;
-
- ObtainSemaphore (&qn -> qn_Semaphore);
- msg = (QMessage *) qn -> qn_List.lh_Head;
- while (next = (QMessage *) msg -> qm_MinNode.mln_Succ)
- {
- if (msg -> qm_Status != QMS_MARKER)
- RemoveAndReply (msg);
- msg = next;
- }
- ReleaseSemaphore (&qn -> qn_Semaphore);
- return 1;
- }
-
- /* new in 3.0 */
-
- ADDTABL_1(LIBQAllocMsg,d0);
-
- QMessage * LIBRT
- LIBQAllocMsg (REG(d0) ULONG size)
- {
- QMessage *msg;
-
- if (!(msg = AllocMem (sizeof (QMessage), MEMF_PUBLIC | MEMF_CLEAR)))
- return NULL;
- if (size)
- {
- if (!(msg -> qm_Data = AllocMem (size, MEMF_SHARED_READ)))
- {
- FreeMem (msg, sizeof (QMessage));
- return NULL;
- }
- }
- return msg;
- }
-
- ADDTABL_2(LIBQFreeMsg,a0,d0);
-
- void LIBRT
- LIBQFreeMsg (REG(a0) QMessage *msg, REG(d0) ULONG size)
- {
- if (msg -> qm_Data)
- FreeMem (msg -> qm_Data, size);
- FreeMem (msg, sizeof (QMessage));
- }
-
- ADDTABL_END();
-